package com.proxy.server.backend.nio;

import com.proxy.common.constant.CapabilitiesType;
import com.proxy.common.constant.PacketType;
import com.proxy.common.exception.ProxyIoException;
import com.proxy.common.packet.*;
import com.proxy.common.utils.CharsetUtil;
import com.proxy.common.utils.SecurityUtil;
import com.proxy.server.backend.BackendConnection;
import com.proxy.server.backend.nio.adapter.BackendPacketDecoder;
import com.proxy.server.backend.nio.adapter.BackendPacketEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Created by liufish on 16/8/2.
 */
public class NioBackendConnection implements BackendConnection {

    /**
     * 0创建 1未认证 -1表示认证失败 -2表示认证失败 2半包认证  3表示认证成
     */
    public final static int STATUS_CREATE = 0;
    public final static int STATUS_UN_HANDSHAKE = 1;
    public final static int STATUS_HALF_HANDSHAKE = 2;
    public final static int STATUS_HANDSHAKE = 3;
    public final static int STATUS_FAIL_HANDSHAKE = -1;
    public final static int STATUS_FAIL_HALF_HANDSHAKE = -2;

    /**
     * netty client
     */
    private NioBackendClient client;

    /**
     * 连接的数据库
     */
    private String schema;

    /**
     * 是否自动提交
     */
    private volatile boolean autoCommit;

    /**
     * 事物隔离级别
     */
    private volatile int txIsolation;

    /**
     * 事物中断
     */
    private volatile boolean txInterrupted;

    /**
     * 数据库编码控制使用dbCharset来处理
     */
    private String dbCharset;

    /**
     * Java相关的字符串编码解码采用charset来表示
     */
    private String charset;

    /**
     * Java相关的字符串编码解码采用charset来表示
     */
    private int charsetIndex;

    /**
     * netty chanel
     */
    private Channel channel;

    /**
     * NIO额外加的数据值
     * 0创建 1 未认证 -1表示认证失败 -2表示认证 2半包认证  3表示认证成
     */
    private volatile int status = 0;

    /**
     * NIO额外加的数据值
     * NIO都是异步,存储seed
     */
    private byte[] seed;


    /**
     * Nio
     */
    private Bootstrap bootstrap;

    /**
     * NIO额外加的数据值
     * 异步握手信号量
     */
    private CountDownLatch handshakeWaiter = new CountDownLatch(1);


    /**
     * 异步包接收者
     */
    private AsyncReceiver receiver = new AsyncReceiver();

    /**
     * 心跳
     */
    private AtomicBoolean isHeartBeating = new AtomicBoolean(false);

    /**
     * 上一次活动的时间,用于心跳控制。
     */
    private AtomicLong lastActiveTime = new AtomicLong(System.currentTimeMillis());

    /**
     * 心跳控制半包
     */
    private volatile boolean receiveHeadEof = false;


    public NioBackendConnection(NioBackendClient client) {
        this.client = client;
        this.schema = this.client.getNode().getSchema();
        _initBootstrap();
    }

    public String getSchema() {
        return schema;
    }

    public boolean isAutoCommit() {
        return autoCommit;
    }

    public void setAutoCommit(boolean autoCommit) {
        this.autoCommit = autoCommit;
    }

    public int getTxIsolation() {
        return txIsolation;
    }

    public void setTxIsolation(int txIsolation) {
        this.txIsolation = txIsolation;
    }

    public boolean isTxInterrupted() {
        return txInterrupted;
    }

    public void setTxInterrupted(boolean txInterrupted) {
        this.txInterrupted = txInterrupted;
    }

    public String getDbCharset() {
        return dbCharset;
    }

    public void setDbCharset(String dbCharset) {
        this.dbCharset = dbCharset;
    }

    public String getCharset() {
        return charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public int getCharsetIndex() {
        return charsetIndex;
    }

    public void setCharsetIndex(int charsetIndex) {
        this.charsetIndex = charsetIndex;
    }

    public boolean isActive() {
        return this.channel.isActive();
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
        //结束的信号
        if (status == STATUS_FAIL_HANDSHAKE
                || status == STATUS_FAIL_HALF_HANDSHAKE
                || status == STATUS_HANDSHAKE ) {
            handshakeWaiter.countDown();
        }
    }

    public byte[] getSeed() {
        return seed;
    }

    public void setSeed(byte[] seed) {
        this.seed = seed;
    }

    public void close() {
        this.channel.close();
    }

    public AsyncReceiver getReceiver() {
        return receiver;
    }

    public void write(ByteBuf buffer) {
        this.channel.writeAndFlush(buffer);
        //设置最新活动时间
        lastActiveTime.set(System.currentTimeMillis());
    }

    public void write(MysqlPacket packet) {
        this.channel.writeAndFlush(packet);
    }

    public boolean register() {
        //成功
        if(status == STATUS_HANDSHAKE){
            return true;
        }
        //失败
        if (status == STATUS_FAIL_HANDSHAKE
                || status == STATUS_FAIL_HALF_HANDSHAKE) {
            return false;
        }
        //等待
        if(status == STATUS_CREATE || status == STATUS_UN_HANDSHAKE || status == STATUS_HALF_HANDSHAKE){
            try {
                handshakeWaiter.await(client.getNode().getConnectTimeout() * 4,TimeUnit.MILLISECONDS);
                //继续
                return register();
            }catch (Exception ex){
                throw new ProxyIoException(ex);
            }
        }
        return false;
    }


    /**
     * 心跳
     */
    public void heartbeat(){
        //如果 当前时间 - 上一次时间 < 心跳间隔时间
        if(System.currentTimeMillis() - lastActiveTime.get() < client.getNode().getHeartbeatTime()  ){
            return;
        }
        isHeartBeating.set(true);
        //update xdual set x=now()/select 1 from dual
        CommandPacket packet = new CommandPacket(0, PacketType.COM_QUERY,"UPDATE XDUAL SET X=NOW()".getBytes());
        this.channel.writeAndFlush(packet);
    }



    /**
     * 初始化连接,并且认证
     */
    private void _initBootstrap() {

        final NioBackendConnection connection = this;

        this.bootstrap = new Bootstrap();
        //Runtime.getRuntime().availableProcessors())
        this.bootstrap.group(new NioEventLoopGroup(1))
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.SO_SNDBUF, 65535)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_RCVBUF, 65535);
        this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {


            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new BackendPacketDecoder());
                socketChannel.pipeline().addLast(new BackendPacketEncoder());
                socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        //创建状态
                        connection.channel = ctx.channel();
                        connection.setStatus(NioBackendConnection.STATUS_CREATE);
                    }

                    @Override
                    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                        connection.channel.close();
                        connection.setStatus(NioBackendConnection.STATUS_FAIL_HANDSHAKE);
                    }

                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

                        final BinaryPacket bin = (BinaryPacket) msg;

                        //心跳控制***********************************************************************开始
                        //遇到错误或者成功,心跳结束,否则需要接收2个结束包。
                        if(isHeartBeating.get()){
                            //接收心跳包
                            if(bin.body[0] == (byte)0x00){
                                //success,结束
                                receiveHeadEof = false;
                                isHeartBeating.set(false);
                            }
                            if(bin.body[0] == (byte)0xff){
                                // error结束
                                receiveHeadEof = false;
                                isHeartBeating.set(false);
                            }
                            //未收到eof
                            if(!receiveHeadEof){
                                if(bin.body[0] == (byte)0xfe){
                                    //继续收下一个eof结束包
                                    receiveHeadEof = true;
                                }
                            }

                            if(receiveHeadEof){
                                if(bin.body[0] == (byte)0xfe){
                                    //收完全部eof,重置,并且结束
                                    receiveHeadEof = false;
                                    isHeartBeating.set(false);
                                }
                            }
                            return;
                        }
                        //心跳控制***********************************************************************结束


                        //普通包接受概率最大***************************************************************开始
                        if (STATUS_HANDSHAKE == status) {
                            connection.getReceiver().receive(bin);
                            return;
                        }
                        //普通包接受概率最大***************************************************************结束


                        //接入认证过程********************************************************************开始
                        //未认证
                        if (connection.getStatus() == NioBackendConnection.STATUS_CREATE) {

                            HandshakePacket handshakePacket = new HandshakePacket();
                            handshakePacket.read(bin);

                            AuthPacket authPacket = new AuthPacket();
                            authPacket.packetId = ++handshakePacket.packetId;
                            authPacket.clientFlags = CapabilitiesType.getServerCapabilities();
                            authPacket.maxPacketSize = 1024 * 1024 * 16; //16M
                            authPacket.charsetIndex = handshakePacket.serverCharsetIndex;
                            authPacket.user = (client.getNode().getUser());
                            final byte[] seed = handshakePacket.seed;
                            connection.setSeed(seed);

                            int charsetIndex = handshakePacket.serverCharsetIndex;
                            byte[] password = client.getNode().getPassword().getBytes(Charset.forName(CharsetUtil.getCharset(charsetIndex)));
                            byte[] restOfScramble = handshakePacket.restOfScrambleBuff;
                            byte[] authSeed = new byte[seed.length + restOfScramble.length];
                            System.arraycopy(seed, 0, authSeed, 0, seed.length);
                            System.arraycopy(restOfScramble, 0, authSeed, seed.length, restOfScramble.length);
                            authPacket.password = SecurityUtil.scramble411(password, authSeed);
                            authPacket.database = client.getNode().getSchema();
                            connection.setStatus(NioBackendConnection.STATUS_UN_HANDSHAKE);
                            ctx.writeAndFlush(authPacket);
                            return;
                        }

                        //认证过程
                        if (connection.getStatus() == NioBackendConnection.STATUS_UN_HANDSHAKE) {
                            switch (bin.body[0]) {
                                case 0x00:
                                    //success
                                    connection.setStatus(NioBackendConnection.STATUS_HANDSHAKE);
                                    break;
                                case (byte) 0xff:
                                    //fail
                                    connection.setStatus(NioBackendConnection.STATUS_FAIL_HANDSHAKE);
                                    ctx.close();
                                    break;
                                case (byte) 0xfe:
                                    // 发送323响应认证数据包
                                    Reply323Packet reply323Packet = new Reply323Packet();
                                    reply323Packet.read(bin);
                                    if (client.getNode().getPassword() != null && client.getNode().getPassword().length() > 0) {
                                        byte[] seed323 = SecurityUtil.scramble323(client.getNode().getPassword(), new String(connection.getSeed())).getBytes();
                                        reply323Packet.seed = seed323;
                                    }
                                    reply323Packet.writeBuffer(ctx.channel().alloc().buffer());
                                    connection.setStatus(NioBackendConnection.STATUS_HALF_HANDSHAKE);
                                    break;
                            }

                            return;
                        }
                        //半包认证过程
                        if (connection.getStatus() == NioBackendConnection.STATUS_HALF_HANDSHAKE) {
                            switch (bin.body[0]) {
                                case 0x00:
                                    //success
                                    connection.setStatus(NioBackendConnection.STATUS_HANDSHAKE);
                                    break;
                                case (byte) 0xff:
                                    //fail
                                    connection.setStatus(NioBackendConnection.STATUS_FAIL_HALF_HANDSHAKE);
                                    ctx.close();
                                    break;
                            }
                            return;
                        }

                        //由于mysql为可信连接,不在处理其余状态。
                        //接入认证过程********************************************************************结束
                    }
                });
            }
        });

        bootstrap.connect(new InetSocketAddress(client.getNode().getHost(), client.getNode().getPort()));
    }


}
